feature: Support Spark expression: arrays_zip#3643
feature: Support Spark expression: arrays_zip#3643parthchandra merged 28 commits intoapache:mainfrom
Conversation
|
Thanks @comphead |
97d31a4 to
be5dfce
Compare
a261445 to
3a21e25
Compare
|
@comphead Thanks for your review. This implementation doesn't use DataFusion for now b/c I need to pass |
| // mimic Spark's ArraysZip behavior: returns NULL if any argument is NULL | ||
| val combinedNullCheck = expr.children.map(child => IsNotNull(child)).reduce(And) | ||
| val isNotNullExpr = exprToProtoInternal(combinedNullCheck, inputs, binding) | ||
| val nullLiteralProto = exprToProto(Literal(null, BooleanType), Seq.empty) |
There was a problem hiding this comment.
The null literal here uses BooleanType, but elsewhere in this file (e.g., CometArrayAppend at line 88) we use the return type of the expression. DF expects all arms of casewhen to have compatible types and this may cause an error.
| object CometArraysZip extends CometExpressionSerde[ArraysZip] { | ||
| override def getSupportLevel(expr: ArraysZip): SupportLevel = { | ||
| expr.dataType match { | ||
| case _: ArrayType => Compatible() |
There was a problem hiding this comment.
We should probably check the element type here. There have been issues noted in the past. See this for instance - #1308
| let fields = self.fields(input_schema)?; | ||
| Ok(List(Arc::new(Field::new_list_field( | ||
| DataType::Struct(Fields::from(fields)), | ||
| true, |
There was a problem hiding this comment.
There is a slight mismatch here. Spark has this defined as non-nullable.
| ))) | ||
| } | ||
| ExprStruct::ArraysZip(expr) => { | ||
| assert!(!expr.values.is_empty()); |
There was a problem hiding this comment.
Better to return Err instead of asserting (which will cause a panic).
return Err(GeneralError("arrays_zip requires at least one argument".to_string()))
If you want to be extra safe, then you can also check
expr.values.len() == expr.names.len()
There was a problem hiding this comment.
Makes sense. I fixed the first one. Thanks Parth.
The 2nd check on length makes sense. Spark's ArraysZip has done the same check at https://github.com/apache/spark/blob/branch-4.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L313-L315. I think we should be safe here b/c of Spark.
| val inputTypes = expr.children.map(_.dataType).toSet | ||
| for (dt <- inputTypes) { | ||
| if (!isTypeSupported(dt)) { | ||
| Unsupported(Some(s"Unsupported child data type: $dt")) |
There was a problem hiding this comment.
| Unsupported(Some(s"Unsupported child data type: $dt")) | |
| return Unsupported(Some(s"Unsupported child data type: $dt")) |
There was a problem hiding this comment.
Otherwise you're always falling thru to compatible.
There was a problem hiding this comment.
Good catch, sorry I missed this one.
parthchandra
left a comment
There was a problem hiding this comment.
lgtm. Some minor nits.
Nice work @hsiang-c
|
depends on #4024 |
Which issue does this PR close?
Closes #3151 and #3575
Rationale for this change
arrays_zipSQL functionWhat changes are included in this PR?
arrays_zip_innerimplementation from DataFusion53.0.0to support custom field name in the resulting struct. It also included subsequent patches: feat: correct struct column names forarrays_zipreturn type datafusion#20886 and fix:arrays_zip/list_zipallow single array argument datafusion#21047.QueryPlanSerDe stage, if one of thearrays_ziparguments isNULL, we returnNULLas Spark does. Examples of Spark'sarrays_zip:How are these changes tested?
By SQL File Tests, we covered cases such as single array argument, nested arrays, arrays of supported types, null arguments and custom field name in the resulting struct. Here is an example of custom file name: